5.5 Human-in-the-loop
本节介绍 LangChain 中的人机协作机制,实现对敏感操作的人工监督。
什么是 Human-in-the-loop?
Human-in-the-loop(HITL,人机协作) 是一种中间件机制,在 Agent 执行潜在风险操作前暂停,等待人工审批。
当模型提出可能有风险的操作(如写入文件、执行 SQL)时,系统会暂停执行并请求人工批准。
三种决策类型
| 决策 | 说明 | 图标 |
|---|---|---|
| Approve | 批准执行,不做修改 | ✅ |
| Edit | 修改参数后执行 | ✏️ |
| Reject | 拒绝执行,附带反馈 | ❌ |
基本配置
python
from langchain.agents import create_agent
from langchain.agents.middleware import HumanInTheLoopMiddleware
from langgraph.checkpoint.memory import InMemorySaver
agent = create_agent(
model="gpt-4o",
tools=[write_file, execute_sql, read_data],
middleware=[
HumanInTheLoopMiddleware(
interrupt_on={
"write_file": True, # 始终需要审批
"execute_sql": { # 自定义配置
"allowed_decisions": ["approve", "reject"]
},
"read_data": False, # 不需要审批
},
description_prefix="工具执行等待审批",
),
],
checkpointer=InMemorySaver(), # 必需:保存中断状态
)工作原理
用户请求
│
▼
┌─────────────────┐
│ Agent 推理 │
└─────────────────┘
│
▼
┌─────────────────┐
│ 模型调用工具 │
└─────────────────┘
│
▼
┌─────────────────┐ 需要审批?
│ HITL 中间件 │────────────────┐
└─────────────────┘ │
│ │
│ 不需要 │ 需要
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ 直接执行 │ │ 暂停执行 │
└─────────────────┘ │ 等待审批 │
└─────────────────┘
│
┌──────────┼──────────┐
│ │ │
▼ ▼ ▼
┌────────┐ ┌────────┐ ┌────────┐
│ Approve│ │ Edit │ │ Reject │
└────────┘ └────────┘ └────────┘
│ │ │
▼ ▼ ▼
┌────────┐ ┌────────┐ ┌────────┐
│ 执行 │ │修改执行│ │ 终止 │
└────────┘ └────────┘ └────────┘完整示例
1. 定义工具
python
from langchain_core.tools import tool
@tool
def write_file(filename: str, content: str) -> str:
"""写入文件"""
with open(filename, "w") as f:
f.write(content)
return f"已写入文件: {filename}"
@tool
def execute_sql(query: str) -> str:
"""执行 SQL 查询"""
# 实际应用中连接数据库
return f"执行 SQL: {query}"
@tool
def send_email(to: str, subject: str, body: str) -> str:
"""发送邮件"""
return f"已发送邮件到 {to}"
@tool
def read_data(source: str) -> str:
"""读取数据(安全操作)"""
return f"读取数据: {source}"2. 创建 Agent
python
from langchain.agents import create_agent
from langchain.agents.middleware import HumanInTheLoopMiddleware
from langgraph.checkpoint.memory import InMemorySaver
checkpointer = InMemorySaver()
agent = create_agent(
model="gpt-4o",
tools=[write_file, execute_sql, send_email, read_data],
middleware=[
HumanInTheLoopMiddleware(
interrupt_on={
"write_file": True,
"execute_sql": True,
"send_email": {
"allowed_decisions": ["approve", "edit", "reject"],
"description": "发送邮件需要确认收件人和内容"
},
"read_data": False,
},
description_prefix="敏感操作需要您的批准",
),
],
checkpointer=checkpointer,
)3. 执行与审批
python
# 初始调用
config = {"configurable": {"thread_id": "session_1"}}
result = agent.invoke(
{"messages": [{"role": "user", "content": "删除 test.txt 文件"}]},
config=config
)
# 检查是否有中断
if result.get("__interrupt__"):
interrupt = result["__interrupt__"]
print(f"等待审批: {interrupt}")
# 人工决策
decision = input("批准? (y/n/e): ")
if decision == "y":
# 批准执行
result = agent.invoke(
{"decision": "approve"},
config=config
)
elif decision == "e":
# 编辑后执行
new_args = {"filename": "safe_test.txt"}
result = agent.invoke(
{"decision": "edit", "new_args": new_args},
config=config
)
else:
# 拒绝执行
result = agent.invoke(
{"decision": "reject", "feedback": "不允许删除文件"},
config=config
)
print(result["messages"][-1].content)高级配置
条件性审批
python
from langchain.agents.middleware import HumanInTheLoopMiddleware
def should_interrupt(tool_name: str, tool_args: dict, state: dict) -> bool:
"""动态决定是否需要审批"""
# SQL DELETE 操作需要审批
if tool_name == "execute_sql":
query = tool_args.get("query", "").upper()
if "DELETE" in query or "DROP" in query:
return True
return False
# 发送到外部邮箱需要审批
if tool_name == "send_email":
to = tool_args.get("to", "")
if not to.endswith("@company.com"):
return True
return False
return False
middleware = HumanInTheLoopMiddleware(
interrupt_condition=should_interrupt,
)自定义审批回调
python
def approval_callback(tool_name: str, tool_args: dict, state: dict) -> dict:
"""自定义审批逻辑"""
print(f"\n{'='*50}")
print(f"工具: {tool_name}")
print(f"参数: {tool_args}")
print(f"{'='*50}\n")
response = input("决策 (approve/edit/reject): ").strip().lower()
if response == "approve":
return {"decision": "approve"}
elif response == "edit":
# 收集新参数
new_args = {}
for key, value in tool_args.items():
new_value = input(f"{key} [{value}]: ").strip()
new_args[key] = new_value if new_value else value
return {"decision": "edit", "new_args": new_args}
else:
feedback = input("拒绝原因: ").strip()
return {"decision": "reject", "feedback": feedback}
middleware = HumanInTheLoopMiddleware(
interrupt_on={"write_file": True, "send_email": True},
approval_callback=approval_callback,
)与 Web 应用集成
Flask 示例
python
from flask import Flask, request, jsonify
from langchain.agents import create_agent
from langchain.agents.middleware import HumanInTheLoopMiddleware
from langgraph.checkpoint.memory import InMemorySaver
app = Flask(__name__)
checkpointer = InMemorySaver()
agent = create_agent(
model="gpt-4o",
tools=[write_file, send_email],
middleware=[
HumanInTheLoopMiddleware(
interrupt_on={"write_file": True, "send_email": True},
),
],
checkpointer=checkpointer,
)
pending_approvals = {}
@app.route("/chat", methods=["POST"])
def chat():
data = request.json
thread_id = data.get("thread_id", "default")
message = data.get("message")
config = {"configurable": {"thread_id": thread_id}}
result = agent.invoke(
{"messages": [{"role": "user", "content": message}]},
config=config
)
if result.get("__interrupt__"):
interrupt = result["__interrupt__"]
pending_approvals[thread_id] = interrupt
return jsonify({
"status": "pending_approval",
"interrupt": interrupt,
})
return jsonify({
"status": "complete",
"response": result["messages"][-1].content,
})
@app.route("/approve", methods=["POST"])
def approve():
data = request.json
thread_id = data.get("thread_id")
decision = data.get("decision")
config = {"configurable": {"thread_id": thread_id}}
payload = {"decision": decision}
if decision == "edit":
payload["new_args"] = data.get("new_args", {})
elif decision == "reject":
payload["feedback"] = data.get("feedback", "")
result = agent.invoke(payload, config=config)
del pending_approvals[thread_id]
return jsonify({
"status": "complete",
"response": result["messages"][-1].content,
})Checkpointer 要求
HITL 中间件必须配合 Checkpointer 使用,以保存中断状态:
| Checkpointer | 用途 |
|---|---|
InMemorySaver | 开发测试 |
SqliteSaver | 单机生产 |
AsyncPostgresSaver | 分布式生产 |
python
# 开发环境
from langgraph.checkpoint.memory import InMemorySaver
checkpointer = InMemorySaver()
# 生产环境
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
checkpointer = AsyncPostgresSaver.from_conn_string(
"postgresql://user:pass@localhost/db"
)最佳实践
| 实践 | 说明 |
|---|---|
| 明确列出敏感工具 | 显式标记需要审批的工具 |
| 提供清晰描述 | 审批请求应包含足够上下文 |
| 设置超时 | 避免无限等待审批 |
| 记录审批日志 | 保留审计轨迹 |
| 使用持久化 Checkpointer | 生产环境使用数据库存储 |
适用场景
| 场景 | 需要审批的操作 |
|---|---|
| 金融应用 | 转账、交易、账户修改 |
| 内容管理 | 发布、删除、修改内容 |
| 系统管理 | 文件操作、配置变更 |
| 通信应用 | 发送邮件、消息 |
| 数据操作 | DELETE、UPDATE、DROP |
上一节:5.4 Model Context Protocol MCP
下一节:5.6 Multi-agent